package Source

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

import java.util.Properties
import scala.collection.immutable
import scala.util.Random

// 物联网领域采集传感器数据

// 定义样例类，温度传感器
//  样例类的好处：定义简单，构造器直接把参数传进去，里面就有对应的属性了，而且会自动生成伴生对象abbly，可以做模式匹配，非常方便


object SourceTest {
  def main(args: Array[String]): Unit = {
    // source 测试
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 11.从集合中读取数据
    val sensorReadings = List(
      SensorReading("sensor_1", 1547718199, 35.8),
      SensorReading("sensor_6", 1547718201, 15.4),
      SensorReading("sensor_7", 1547718202, 6.7),
      SensorReading("sensor_10", 1547718205, 38.1)
    )

    val ds1: DataStream[SensorReading] = env.fromCollection(sensorReadings)
//    ds1.print()

    // 2.从文件中读取数据
    val inputPath = "E:\\Flinklearn\\Flink\\resources\\data1\\sensor.txt"
    val ds2: DataStream[String] = env.readTextFile(inputPath)
//    ds2.print()

    // 3.从kafka中读取数据
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","master:9092")
    properties.setProperty("group.id","consumer-group")
    val ds3: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("sensor", new SimpleStringSchema(), properties))

    ds3.print("sensor Hi,kafka")



    // 4.自定义source

    env.setParallelism(1)
    val ds4: DataStream[SensorReading] = env.addSource(new MySensorSource())
//    ds4.print("自定义source")


    env.execute("source Test")

  }
  case class SensorReading(id: String, timestamp: Long, temperature: Double)

  // 4.自定义SourceFunction
  // 好处：1.做测试 随机生成数据 2.从其他地方读取数据
  class MySensorSource() extends SourceFunction[SensorReading] {
    // 1.定义一个标识位，用来表示数据源是否正常运行发出数据
    var running: Boolean = true


    override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
      // 2.定义一个随机数发生器
      val random = new Random()

      // 3.随机生成一组（10个）传感器的初始温度:(id,temp)
      val curTemp: immutable.IndexedSeq[(String, Double)] = 1.to(10).map(i => ("sensor_" + i, random.nextDouble() * 100))

      // 4.定义无限循环，不停地产生数据，除非被cancel
      while (running) {
        // 5.在上次数据基础上微调，更新温度值
        val Temp: immutable.IndexedSeq[(String, Double)] = curTemp.map(
          data => (data._1, data._2 + random.nextGaussian())
        )

        // 6.获取当前时间戳，加入到数据中,调用stx.collect发出数据
        val curTime: Long = System.currentTimeMillis()
        Temp.foreach(
          data => sourceContext.collect(SensorReading(data._1, curTime, data._2))
        )
        // 间隔
        Thread.sleep(500)

      }

    }

    override def cancel(): Unit = running = false
  }

}
